19.1 Agent 架构深度解析
🎯 本节目标
深入理解 DeepAnalyze 自研 Agent 框架的核心实现,包括迭代推理循环、代码执行机制和消息处理流程。
🔄 核心:迭代推理系统
DeepAnalyzeVLLM 类结构
DeepAnalyze 的核心是 DeepAnalyzeVLLM 类,它实现了一个最多 30 轮的迭代推理循环:
python
class DeepAnalyzeVLLM:
"""DeepAnalyze 的核心 Agent 实现"""
def __init__(self, checkpoint_path, api_base="http://localhost:8000"):
self.model_name = checkpoint_path
self.api_base = api_base
self.max_rounds = 30 # 最大迭代轮次
def generate(self, prompt: str, workspace: str) -> dict:
"""
主入口:执行数据分析任务
Args:
prompt: 用户的分析请求
workspace: 工作目录(包含数据文件)
Returns:
包含推理过程和最终答案的字典
"""
messages = [{"role": "user", "content": prompt}]
for round_num in range(self.max_rounds):
# 1. 调用 vLLM API 获取响应
response = self._call_vllm_api(messages)
# 2. 检测并执行代码块
if self._has_code_block(response):
code_blocks = self._extract_code_blocks(response)
for code in code_blocks:
result = self._execute_code(code, workspace)
messages.append({
"role": "execute",
"content": f"<Execute>{result}</Execute>"
})
# 3. 检查是否完成
if self._has_answer(response):
return {
"reasoning": messages,
"answer": self._extract_answer(response)
}
# 4. 将响应添加到消息历史
messages.append({"role": "assistant", "content": response})
return {"reasoning": messages, "answer": "Max rounds reached"}与 LangGraph 的架构对比
DeepAnalyze 架构 LangGraph 架构
───────────────── ──────────────
┌─────┐ ┌─────┐
│User │ │START│
└──┬──┘ └──┬──┘
│ │
▼ ▼
┌────────┐ ┌──────────┐
│ LLM │◄──┐ │ Agent │◄──┐
└────┬───┘ │ └────┬─────┘ │
│ │ │ │
▼ │ ▼ │
┌────────┐ │ ┌──────────┐ │
│Extract │ │ │ Condition│ │
│ Code │ │ │ Edge │ │
└────┬───┘ │ └────┬─────┘ │
│ │ ┌────┴────┐ │
▼ │ │ │ │
┌────────┐ │ ┌────▼───┐ ┌───▼──┐ │
│Execute │───┘ │ Tools │ │ END │ │
└────────┘ └────┬───┘ └──────┘ │
│ │ │
┌────▼────┐ └──────────────┘
│ Answer? │
└────┬────┘
yes │ no
│ └────────────────────┐
▼ │
┌─────────┐ │
│ Return │ │
└─────────┘ │
└───► 继续循环🏷️ 标签系统详解
DeepAnalyze 使用自定义 XML 标签来结构化 LLM 的输出:
标签类型
| 标签 | 用途 | 触发动作 |
|---|---|---|
<Analyze> | 分析过程记录 | 无 |
<Understand> | 数据理解阶段 | 无 |
<Code> | Python 代码块 | 触发代码执行 |
<Execute> | 执行结果 | 由系统生成 |
<Answer> | 最终答案 | 触发终止循环 |
标签提取实现
python
import re
def extract_code_blocks(response: str) -> list[str]:
"""
从响应中提取所有 <Code> 标签内的代码
支持两种格式:
1. <Code>python code</Code>
2. <Code>```python
code
```</Code>
"""
pattern = r"<Code>(.*?)</Code>"
matches = re.findall(pattern, response, re.DOTALL)
code_blocks = []
for match in matches:
# 处理 markdown 代码块格式
if match.strip().startswith("```"):
# 移除 ```python 和 ``` 标记
code = re.sub(r"```\w*\n?", "", match)
code = code.strip()
else:
code = match.strip()
code_blocks.append(code)
return code_blocks
def extract_answer(response: str) -> str:
"""提取 <Answer> 标签内的内容"""
pattern = r"<Answer>(.*?)</Answer>"
match = re.search(pattern, response, re.DOTALL)
return match.group(1).strip() if match else ""
def has_answer(response: str) -> bool:
"""检查是否包含 <Answer> 标签"""
return "<Answer>" in response标签设计的优缺点
优点:
- ✅ 简单直观,易于解析
- ✅ 模型训练时可以强化标签使用
- ✅ 输出结构化,便于后处理
- ✅ 无需依赖外部工具调用 API
缺点:
- ❌ 无法动态注册新工具
- ❌ 所有"工具"只有代码执行一种
- ❌ 不如 function calling 灵活
- ❌ 需要模型训练来学习标签使用
🖥️ 代码执行沙箱
执行机制
DeepAnalyze 使用 subprocess 在隔离环境中执行代码:
python
import subprocess
import tempfile
import os
def execute_code(code: str, workspace: str, timeout: int = 120) -> str:
"""
在沙箱环境中执行 Python 代码
Args:
code: 要执行的 Python 代码
workspace: 工作目录
timeout: 超时时间(秒)
Returns:
执行结果(stdout + stderr)
"""
# 创建临时文件
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.py',
dir=workspace,
delete=False
) as f:
f.write(code)
temp_file = f.name
try:
# 设置环境变量(无头模式绘图)
env = os.environ.copy()
env['MPLBACKEND'] = 'Agg'
# 执行代码
result = subprocess.run(
['python', temp_file],
cwd=workspace,
capture_output=True,
text=True,
timeout=timeout,
env=env
)
# 合并 stdout 和 stderr
output = result.stdout
if result.stderr:
output += f"\n[STDERR]\n{result.stderr}"
return output
except subprocess.TimeoutExpired:
return f"[ERROR] Execution timeout after {timeout} seconds"
except Exception as e:
return f"[ERROR] {type(e).__name__}: {str(e)}"
finally:
# 清理临时文件
if os.path.exists(temp_file):
os.remove(temp_file)异步执行版本
python
import asyncio
async def execute_code_async(
code: str,
workspace: str,
timeout: int = 120
) -> str:
"""异步代码执行,用于 API 服务"""
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.py',
dir=workspace,
delete=False
) as f:
f.write(code)
temp_file = f.name
try:
process = await asyncio.create_subprocess_exec(
'python', temp_file,
cwd=workspace,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={**os.environ, 'MPLBACKEND': 'Agg'}
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
output = stdout.decode()
if stderr:
output += f"\n[STDERR]\n{stderr.decode()}"
return output
except asyncio.TimeoutError:
process.terminate()
return f"[ERROR] Execution timeout"
finally:
if os.path.exists(temp_file):
os.remove(temp_file)安全考虑
代码执行安全措施:
├── 进程隔离(subprocess)
├── 超时限制(默认 120 秒)
├── 工作目录隔离
├── 环境变量控制
└── ⚠️ 注意:无完整沙箱(如 Docker)
生产环境建议:
├── 使用 Docker 容器隔离
├── 限制网络访问
├── 限制文件系统访问
├── 设置资源限制(CPU/内存)
└── 实施代码审查📨 消息处理流程
消息格式规范化
python
def normalize_message_content(message: dict) -> str:
"""
将 OpenAI 格式的消息内容规范化为纯文本
OpenAI 格式支持:
- 字符串内容: "Hello"
- 列表内容: [{"type": "text", "text": "Hello"}]
"""
content = message.get("content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
texts.append(item.get("text", ""))
return "\n".join(texts)
return str(content)构建 vLLM 消息
python
def prepare_vllm_messages(
user_query: str,
workspace_files: list[str]
) -> list[dict]:
"""
构建发送给 vLLM 的消息列表
消息结构:
# Instruction
<用户查询>
# Data
<工作目录中的文件列表>
"""
instruction = f"# Instruction\n{user_query}"
if workspace_files:
data_section = "# Data\n" + "\n".join(
f"- {f}" for f in workspace_files
)
content = f"{instruction}\n\n{data_section}"
else:
content = instruction
return [{"role": "user", "content": content}]📊 报告生成机制
从对话到报告
python
def generate_report(messages: list[dict]) -> str:
"""
从对话历史生成 Markdown 报告
报告结构:
1. 主体:最终 <Answer> 的内容
2. 附录:完整的对话历史(所有标签内容)
"""
report_parts = []
conversation_log = []
for msg in messages:
content = msg.get("content", "")
role = msg.get("role", "")
# 提取各类标签
for tag in ["Analyze", "Understand", "Code", "Execute", "Answer"]:
pattern = f"<{tag}>(.*?)</{tag}>"
matches = re.findall(pattern, content, re.DOTALL)
for match in matches:
conversation_log.append(f"### {tag}\n{match.strip()}")
# 提取最终答案作为报告主体
if "<Answer>" in content:
answer = extract_answer(content)
report_parts.insert(0, answer)
# 组装报告
report = "\n\n".join(report_parts)
report += "\n\n---\n\n## 附录:完整对话历史\n\n"
report += "\n\n".join(conversation_log)
return report🔌 vLLM API 集成
API 调用封装
python
import requests
class VLLMClient:
"""vLLM API 客户端"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url.rstrip("/")
def chat_completion(
self,
messages: list[dict],
model: str = "deepanalyze-8b",
temperature: float = 0.7,
max_tokens: int = 4096
) -> str:
"""
调用 vLLM 的 chat/completions 端点
"""
response = requests.post(
f"{self.base_url}/v1/chat/completions",
json={
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]流式响应支持
python
def stream_chat_completion(
self,
messages: list[dict],
model: str = "deepanalyze-8b"
) -> Iterator[str]:
"""流式生成响应"""
response = requests.post(
f"{self.base_url}/v1/chat/completions",
json={
"model": model,
"messages": messages,
"stream": True
},
stream=True
)
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = json.loads(line[6:])
if data != "[DONE]":
delta = data["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]🎨 完整工作流示例
python
# 1. 初始化客户端
deepanalyze = DeepAnalyzeVLLM(
checkpoint_path="/models/deepanalyze-8b/",
api_base="http://localhost:8000"
)
# 2. 准备工作目录
workspace = "/data/analysis/"
# 目录中包含: person.csv, enrolled.csv, etc.
# 3. 发送分析请求
prompt = """
请分析工作目录中的学生贷款数据,生成一份包含以下内容的报告:
1. 数据概览和统计摘要
2. 贷款违约率分析
3. 关键风险因素识别
4. 可视化图表
5. 结论和建议
"""
# 4. 执行分析
result = deepanalyze.generate(prompt, workspace)
# 5. 获取结果
print("推理过程:", result["reasoning"])
print("最终答案:", result["answer"])💡 核心洞察
DeepAnalyze Agent 的设计哲学
简单即美德
- 使用简单的 for 循环而非复杂的图结构
- 标签系统比 function calling 更直观
专注于领域
- 只有"代码执行"一种工具
- 但这个工具足够强大,可以做任何数据分析
训练优于工程
- 通过训练让模型学会使用标签
- 而非通过复杂的提示工程
端到端控制
- 从训练到推理全链路可控
- 无第三方依赖的黑盒
与 LangGraph 的本质区别
LangGraph:
├── "声明式" - 定义图结构,框架执行
├── "通用" - 支持任意工具和 LLM
├── "工程" - 通过 Prompt 和工具集成实现能力
└── "灵活" - 运行时可修改行为
DeepAnalyze:
├── "命令式" - 直接编写执行逻辑
├── "专用" - 只支持自己的模型
├── "训练" - 通过训练实现能力
└── "固定" - 行为由模型权重决定接下来: 19.2 训练范式与数据